package cn._51doit.flink.day06;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

/**
 * 将ValueState的底层实现
 *
 * Flink的State分为两种：KeyedState(KeyBy之后对应的State)，和OperatorState（没有keyBy的State）
 *
 * ValueState是KeyedState中的一种
 *
 * 1.KeyedState底层是一个Map结构
 * 2.如果想要容错，必须开启checkpointing，并且按照Flink的状态编程API进行编程（将中间结果保存都Flink特殊的变量中）
 *
 * 使用Flink的ValueState编程API实现WordCount的功能
 *
 */
public class ValueStateDemo3 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //开启checkpoint
        env.enableCheckpointing(5000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000));

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                if (line.startsWith("error")) {
                    throw new RuntimeException("有错误数据出现，抛出异常！");
                }
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(t -> t.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyedStream.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private ValueState<Integer> valueState;
            //在open方法中初始化状态或恢复状态
            @Override
            public void open(Configuration parameters) throws Exception {
                //定义状态描述器(描述状态的类型、名称)
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);
                //初始化或恢复状态
                valueState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                Integer current = input.f1;
                //内部会获取当前的key，根据当前的key取出对应的value
                Integer history = valueState.value();
                if (history == null) {
                    history = 0;
                }
                current += history;
                //更新状态
                valueState.update(current);
                //输出数据
                input.f1 = current;
                return input;
            }
        });

        res.print();

        env.execute();


    }
}
